package com.bytecub.udp.service.network.impl;


import com.bytecub.common.biz.TopicBiz;
import com.bytecub.common.domain.gateway.mq.DeviceActiveMqBo;
import com.bytecub.common.domain.gateway.mq.DeviceUpMessageBo;
import com.bytecub.common.enums.BCErrorEnum;
import com.bytecub.common.exception.BCGException;
import com.bytecub.common.metadata.ProductFuncTypeEnum;
import com.bytecub.gateway.mq.excutor.DeviceMessageUpExecutor;
import com.bytecub.gateway.mq.producer.DeviceActiveMqProducer;
import com.bytecub.protocol.base.IBaseProtocol;
import com.bytecub.udp.domain.bo.ParseDeviceBo;
import com.bytecub.udp.service.parser.DownMessageParser;
import com.bytecub.udp.service.parser.UdpAuthParser;
import com.bytecub.udp.service.parser.UdpProtocolParser;
import com.bytecub.utils.SpringContextUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.Date;


/**
 *  * ByteCub.cn.
 *  * Copyright (c) 2020-2021 All Rights Reserved.
 *  * 
 *  * @author bytecub@163.com  songbin
 *  * @Date 2021/4/2  Exp $$
 *  
 */

public class BcUdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
    private Logger logger = LoggerFactory.getLogger(BcUdpServerHandler.class);
    private UdpAuthParser authParser;
    private UdpProtocolParser protocolParser;
    private DeviceMessageUpExecutor messageUpExecutor;
    private DownMessageParser downMessageParser;

    public BcUdpServerHandler(UdpAuthParser authParser,
                              UdpProtocolParser protocolParser,
                              DeviceMessageUpExecutor deviceMessageUpExecutor,
                              DownMessageParser downMessageParser){
        this.authParser = authParser;
        this.protocolParser = protocolParser;
        this.downMessageParser = downMessageParser;
        this.messageUpExecutor = deviceMessageUpExecutor;
    }


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
        // 接受client的消息
        try{
            final ByteBuf buf = msg.content();
            int readableBytes = buf.readableBytes();
            byte[] content = new byte[readableBytes];
            buf.readBytes(content);
            this.processAuth(content);
            ParseDeviceBo parseDeviceBo = protocolParser.parseProtocol(content);
            int port = msg.sender().getPort();
            String host = msg.sender().getAddress().getHostAddress();

            this.deviceActive(parseDeviceBo.getDeviceCode(), host, port);
            this.processPropReport(parseDeviceBo, content);
            this.processEventReport(parseDeviceBo, content);
            byte[] reply = downMessageParser.encode(parseDeviceBo.getDeviceCode());
            ctx.writeAndFlush(new DatagramPacket(Unpooled.wrappedBuffer(reply),msg.sender()));

        }catch (BCGException e){
            logger.warn("{}", e.getExtraMsg(), e);

        }catch (Exception e){
            logger.warn("", e);
        }

    }
    /**
     * 设备在线处理
     * */
    private void deviceActive(String deviceCode, String host, int port){
        try{
            DeviceActiveMqBo bo = new DeviceActiveMqBo();
            bo.setActive(true);
            bo.setDeviceCode(deviceCode);
            bo.setTimestamp(new Date());
            bo.setHost(host);
            bo.setPort(port);
            DeviceActiveMqProducer.send(bo);
        }catch (Exception e){
            logger.warn("UDP process device({}) online exception ", deviceCode, e);
        }
    }

    private boolean processAuth(byte[] content){
        if(!authParser.verify(content)){
            throw BCGException.genException(BCErrorEnum.INNER_EXCEPTION, "设备未授权");
        }
        return true;
    }

    private void processPropReport(ParseDeviceBo parseDeviceBo, byte[] content){
        try{
            String deviceCode = parseDeviceBo.getDeviceCode();
            String topic = TopicBiz.buildPropertyReport(deviceCode, parseDeviceBo.getProductCode());
            if(parseDeviceBo.getSubFlag()){
                topic = TopicBiz.buildPropertyReportSub(deviceCode, parseDeviceBo.getProductCode());
            }
            DeviceUpMessageBo deviceUpMessageBo = new DeviceUpMessageBo();
            deviceUpMessageBo.setDeviceCode(deviceCode);
            deviceUpMessageBo.setFuncType(ProductFuncTypeEnum.PROP);
            deviceUpMessageBo.setProductCode(parseDeviceBo.getProductCode());
            deviceUpMessageBo.setSourceMsg(content);
            deviceUpMessageBo.setTopic(topic);
            messageUpExecutor.execute(deviceUpMessageBo, true);
        }catch (Exception e){
            throw BCGException.genException(BCErrorEnum.INNER_EXCEPTION, "处理上报数据异常");
        }
    }

    private void processEventReport(ParseDeviceBo parseDeviceBo, byte[] content){
        try{
            String deviceCode = parseDeviceBo.getDeviceCode();
            String topic = TopicBiz.buildEventReport(deviceCode, parseDeviceBo.getProductCode());
            if(parseDeviceBo.getSubFlag()){
                topic = TopicBiz.buildEventReportSub(deviceCode, parseDeviceBo.getProductCode());
            }
            DeviceUpMessageBo deviceUpMessageBo = new DeviceUpMessageBo();
            deviceUpMessageBo.setDeviceCode(deviceCode);
            deviceUpMessageBo.setFuncType(ProductFuncTypeEnum.EVENT);
            deviceUpMessageBo.setProductCode(parseDeviceBo.getProductCode());
            deviceUpMessageBo.setSourceMsg(content);
            deviceUpMessageBo.setTopic(topic);
            messageUpExecutor.execute(deviceUpMessageBo, true);
        }catch (Exception e){
            throw BCGException.genException(BCErrorEnum.INNER_EXCEPTION, "处理上报数据异常");
        }
    }
}
